Skip to content

KAFKA-19144 Move DelayedProduce to server module #19793

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 7 commits into
base: trunk
Choose a base branch
from

Conversation

johnny94
Copy link
Contributor

This PR moves DelayedProduce to the server module. One notable change is that the type of the responseCallback parameter in ReplicaManager#appendRecords() has been changed to a Java Map. Other related type changes have been made accordingly.

@github-actions github-actions bot added triage PRs from the community core Kafka Broker labels May 23, 2025
@johnny94 johnny94 force-pushed the kafka-19144_move_delayed_produce_to_server_module branch from 03e76dc to 9f2951e Compare May 23, 2025 17:04
Copy link
Member

@FrankYang0529 FrankYang0529 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the patch. Leave some minor comments.

Comment on lines 1011 to 1012
val produceResponseStatus = initialProduceStatus.map { case (k, status) => k -> status.responseStatus }
responseCallback(produceResponseStatus)
responseCallback(produceResponseStatus.asJava)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We use initialProduceStatus to build a Scala map and transfer to Java. Probably, we can build a Java map directly, so we can avoid asJava here.

      val produceResponseStatus = new util.HashMap[TopicIdPartition, PartitionResponse]()
      initialProduceStatus.foreach { case (k, status) => produceResponseStatus.put(k, status.responseStatus()) }
      responseCallback(produceResponseStatus)

@@ -1006,7 +1026,7 @@ class ReplicaManager(val config: KafkaConfig,
LogAppendInfo.UNKNOWN_LOG_APPEND_INFO.logStartOffset
)
}
responseCallback(responseStatus)
responseCallback(responseStatus.asJava)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

    val responseStatus = new util.HashMap[TopicIdPartition, PartitionResponse]()
    entries.foreach { case (topicIdPartition, _) =>
      responseStatus.put(topicIdPartition, new PartitionResponse(
        Errors.INVALID_REQUIRED_ACKS,
        LogAppendInfo.UNKNOWN_LOG_APPEND_INFO.firstOffset,
        RecordBatch.NO_TIMESTAMP,
        LogAppendInfo.UNKNOWN_LOG_APPEND_INFO.logStartOffset
      ))
    }
    responseCallback(responseStatus)

@github-actions github-actions bot removed the triage PRs from the community label May 25, 2025
@chia7712
Copy link
Member

I will review this PR after #19798 gets merged.

@chia7712
Copy link
Member

@johnny94 please fix the conflicts

Copy link
Contributor

@frankvicky frankvicky left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@johnny94 Thanks for the patch!
I have a few comments. PTAL

def delegate(tp: TopicPartition, status: ProducePartitionStatus) : Unit = {
val (hasEnough, error) = getPartitionOrError(tp) match {
case Left(err) =>
// Case A
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does this comment mean?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see this is relevant to the comment of DelayedProduce, but it's confusing that these comments are standalone here.
Could you write the whole meaning of these cases, or link these comments to tryComplete ?

partition.checkEnoughReplicasReachOffset(status.requiredOffset)
}

// Case B || C.1 || C.2
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

def appendCallback(responses: scala.collection.Map[TopicIdPartition, PartitionResponse]): Unit = {
val response = responses.get(partition)
assertTrue(response.isDefined)
def appendCallback(responses: java.util.Map[TopicIdPartition, PartitionResponse]): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that we already have an import alias for java.util.Map, we could reuse it.

Suggested change
def appendCallback(responses: java.util.Map[TopicIdPartition, PartitionResponse]): Unit = {
def appendCallback(responses: JMap[TopicIdPartition, PartitionResponse]): Unit = {

@@ -1104,7 +1104,7 @@ class TransactionStateManagerTest {
capturedAppends: mutable.Map[TopicIdPartition, mutable.Buffer[MemoryRecords]]
): Unit = {
val recordsCapture: ArgumentCaptor[Map[TopicIdPartition, MemoryRecords]] = ArgumentCaptor.forClass(classOf[Map[TopicIdPartition, MemoryRecords]])
val callbackCapture: ArgumentCaptor[Map[TopicIdPartition, PartitionResponse] => Unit] = ArgumentCaptor.forClass(classOf[Map[TopicIdPartition, PartitionResponse] => Unit])
val callbackCapture: ArgumentCaptor[java.util.Map[TopicIdPartition, PartitionResponse] => Unit] = ArgumentCaptor.forClass(classOf[java.util.Map[TopicIdPartition, PartitionResponse] => Unit])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please import java.util

@@ -2832,7 +2832,7 @@ class KafkaApisTest extends Logging {
any(),
ArgumentMatchers.eq(requestLocal),
any()
)).thenAnswer(_ => responseCallback.getValue.apply(Map(new TopicIdPartition(topicId,tp2) -> new PartitionResponse(Errors.NONE))))
)).thenAnswer(_ => responseCallback.getValue.apply(Map(new TopicIdPartition(topicId,tp2) -> new PartitionResponse(Errors.NONE)).asJava))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
)).thenAnswer(_ => responseCallback.getValue.apply(Map(new TopicIdPartition(topicId,tp2) -> new PartitionResponse(Errors.NONE)).asJava))
)).thenAnswer(_ => responseCallback.getValue.apply(util.Map.of(new TopicIdPartition(topicId,tp2), new PartitionResponse(Errors.NONE))))

val response = responses.get(topicIdPartition)
assertTrue(response.isDefined)
def appendCallback(responses: util.Map[TopicIdPartition, PartitionResponse]): Unit = {
val response = java.util.Optional.ofNullable(responses.get(topicIdPartition))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We already imported java.util.Optional, so we don't need a full-qualified name.

Comment on lines 143 to 146
PARTITION_EXPIRATION_METERS.computeIfAbsent(partition,
key -> METRICS_GROUP.newMeter("ExpiresPerSec", "requests", TimeUnit.SECONDS,
Map.of("topic", key.topic(), "partition", String.valueOf(key.partition()))))
.mark();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style nit:

Suggested change
PARTITION_EXPIRATION_METERS.computeIfAbsent(partition,
key -> METRICS_GROUP.newMeter("ExpiresPerSec", "requests", TimeUnit.SECONDS,
Map.of("topic", key.topic(), "partition", String.valueOf(key.partition()))))
.mark();
PARTITION_EXPIRATION_METERS.computeIfAbsent(partition,
key -> METRICS_GROUP.newMeter("ExpiresPerSec",
"requests",
TimeUnit.SECONDS,
Map.of("topic", key.topic(), "partition", String.valueOf(key.partition())))
).mark();

Comment on lines 201 to 209
boolean anyPending = produceMetadata.produceStatus
.values()
.stream()
.anyMatch(ProducePartitionStatus::acksPending);
if (!anyPending) {
return forceComplete();
}

return false;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

Suggested change
boolean anyPending = produceMetadata.produceStatus
.values()
.stream()
.anyMatch(ProducePartitionStatus::acksPending);
if (!anyPending) {
return forceComplete();
}
return false;
return produceMetadata.produceStatus.values()
.stream()
.findAny()
.map(__ -> false)
.orElseGet(this::forceComplete);

Copy link
Contributor Author

@johnny94 johnny94 May 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, I think it keeps the idea of original implementation and easier to read. What do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm okay with it.

Copy link
Collaborator

@m1a2st m1a2st left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @johnny94 for this patch, left some comments

Comment on lines +1006 to +1007
val produceResponseStatus = new util.HashMap[TopicIdPartition, PartitionResponse]
initialProduceStatus.foreach { case (k, status) => k -> produceResponseStatus.put(k, status.responseStatus) }
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
val produceResponseStatus = new util.HashMap[TopicIdPartition, PartitionResponse]
initialProduceStatus.foreach { case (k, status) => k -> produceResponseStatus.put(k, status.responseStatus) }
val produceResponseStatus = initialProduceStatus.map { case (k, status) =>
k -> status.responseStatus
}.asJava

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's ok to use HashMap instead of asJava conversion.

Comment on lines 275 to 278
val response = Optional.ofNullable(responses.get(partition))

assertTrue(response.isPresent)
result.fire(response.get)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
val response = Optional.ofNullable(responses.get(partition))
assertTrue(response.isPresent)
result.fire(response.get)
val response = responses.get(partition)
assertNotNull(response)
result.fire(response)

Comment on lines 2994 to 2996
val response = java.util.Optional.ofNullable(responses.get(topicIdPartition))
assertTrue(response.isPresent)
result.fire(response.get)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
val response = java.util.Optional.ofNullable(responses.get(topicIdPartition))
assertTrue(response.isPresent)
result.fire(response.get)
val response = responses.get(topicIdPartition)
assertNotNull(response)
result.fire(response)

* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Last time, we moved DelayedDeleteRecords to org.apache.kafka.server.purgatory (see 2994e5e). I'm curious, why we chose a different location for DelayedProduce?

LOGGER.trace("Checking produce satisfaction for {}, current status {}", topicIdPartition, status);
// skip those partitions that have already been satisfied
if (status.acksPending) {
// Delegate to `ReplicaManager#maybeAddDelayedProduc`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybeAddDelayedProduc -> maybeAddDelayedProduce

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants